///|
async fn[T, E : Error] async_suspend(
  cb : ((T) -> Unit, (E) -> Unit) -> Unit,
) -> T raise E = "%async.suspend"

///|
fn run_async(f : async () -> Unit noraise) = "%async.run"


///|
priv enum State {
  Done
  Fail(Error)
  Running
  Suspend(ok_cont~ : (Unit) -> Unit, err_cont~ : (Error) -> Unit)
}

///|
struct Coroutine {
  coro_id : Int
  mut state : State
  mut shielded : Bool
  mut cancelled : Bool
  mut ready : Bool
  downstream : Map[Int, Coroutine]
}

///|
pub impl Eq for Coroutine with equal(c1, c2) {
  c1.coro_id == c2.coro_id
}

///|
pub impl Hash for Coroutine with hash_combine(self, hasher) {
  self.coro_id.hash_combine(hasher)
}

///|
pub fn Coroutine::wake(self : Coroutine) -> Unit {
  self.ready = true
  scheduler.run_later.push_back(self)
}

///|
pub fn Coroutine::run(self : Coroutine) -> Unit {
  self.ready = true
  scheduler.run_later.push_front(self)
}

///|
pub fn Coroutine::is_done(self : Coroutine) -> Bool {
  match self.state {
    Done => true
    Fail(_) => true
    Running | Suspend(_) => false
  }
}

///|
pub fn is_being_cancelled() -> Bool {
  current_coroutine().cancelled
}

///|
pub fn current_coroutine_done() -> Bool {
  guard scheduler.curr_coro is Some(coro) else { return true }
  coro.is_done()
}

///|
pub(all) suberror Cancelled derive(Show)

///|
pub fn Coroutine::cancel(self : Coroutine) -> Unit {
  self.cancelled = true
  if not(self.shielded || self.ready) {
    self.wake()
  }
}

///|
pub async fn pause() -> Unit {
  guard scheduler.curr_coro is Some(coro)
  if coro.cancelled && not(coro.shielded) {
    raise Cancelled::Cancelled
  }
  async_suspend(fn(ok_cont, err_cont) {
    guard coro.state is Running
    coro.state = Suspend(ok_cont~, err_cont~)
    coro.ready = true
    scheduler.run_later.push_back(coro)
  })
}

///|
pub async fn suspend() -> Unit raise {
  guard scheduler.curr_coro is Some(coro)
  if coro.cancelled && not(coro.shielded) {
    raise Cancelled::Cancelled
  }
  scheduler.blocking += 1
  defer {
    scheduler.blocking -= 1
  }
  async_suspend(fn(ok_cont, err_cont) {
    guard coro.state is Running
    coro.state = Suspend(ok_cont~, err_cont~)
  })
}

///|
pub fn spawn(f : async () -> Unit raise) -> Coroutine {
  scheduler.coro_id += 1
  let coro = {
    state: Running,
    ready: true,
    shielded: false,
    downstream: {},
    coro_id: scheduler.coro_id,
    cancelled: false,
  }
  fn run(_) {
    run_async(fn() {
      coro.shielded = false
      try f() catch {
        err => coro.state = Fail(err)
      } noraise {
        _ => coro.state = Done
      }
      for _, coro in coro.downstream {
        coro.wake()
      }
      coro.downstream.clear()
    })
  }

  coro.state = Suspend(ok_cont=run, err_cont=_ => ())
  scheduler.run_later.push_back(coro)
  coro
}

///|
pub fn Coroutine::unwrap(self : Coroutine) -> Unit raise {
  match self.state {
    Done => ()
    Fail(err) => raise err
    Running | Suspend(_) => panic()
  }
}

///|
pub async fn Coroutine::wait(target : Coroutine) -> Unit raise {
  guard scheduler.curr_coro is Some(coro)
  guard not(physical_equal(coro, target))
  match target.state {
    Done => return
    Fail(err) => raise err
    Running | Suspend(_) => ()
  }
  target.downstream[coro.coro_id] = coro
  try suspend() catch {
    err => {
      target.downstream.remove(coro.coro_id)
      raise err
    }
  } noraise {
    _ => target.unwrap()
  }
}

///|
pub async fn protect_from_cancel(f : async () -> Unit raise) -> Unit raise {
  guard scheduler.curr_coro is Some(coro)
  if coro.shielded {
    // already in a shield, do nothing
    f()
  } else {
    coro.shielded = true
    defer {
      coro.shielded = false
    }
    f()
    if coro.cancelled {
      raise Cancelled::Cancelled
    }
  }
}


///|
priv struct Scheduler {
  mut coro_id : Int
  mut curr_coro : Coroutine?
  mut blocking : Int
  run_later : @deque.Deque[Coroutine]
}

///|
let scheduler : Scheduler = {
  coro_id: 0,
  curr_coro: None,
  blocking: 0,
  run_later: @deque.new(),
}

///|
pub fn current_coroutine() -> Coroutine {
  scheduler.curr_coro.unwrap()
}

///|
pub fn no_more_work() -> Bool {
  scheduler.blocking == 0 && scheduler.run_later.is_empty()
}

///|
pub fn rschedule() -> Unit {
  while scheduler.run_later.pop_front() is Some(coro) {
    coro.ready = false
    guard coro.state is Suspend(ok_cont~, err_cont~) else {  }
    coro.state = Running
    let last_coro = scheduler.curr_coro
    scheduler.curr_coro = Some(coro)
    if coro.cancelled && !coro.shielded {
      err_cont(Cancelled::Cancelled)
    } else {
      ok_cont(())
    }
    scheduler.curr_coro = last_coro
  }
}
